# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

<% deprecation_logger.deprecated("The Netflow module has been deprecated in favor of the Beats Netflow module and may be removed in a future release. Learn more about the Beats Netflow module at https://www.elastic.co/guide/en/beats/filebeat/master/filebeat-module-netflow.html") %>
input {
    udp {
        type => "netflow"
        port => <%= setting("var.input.udp.port", 2055) %>
        codec => netflow {
            versions => [5,9]
        }
        workers => <%= setting("var.input.udp.workers", 2) %>
        receive_buffer_bytes => <%= setting("var.input.udp.receive_buffer_bytes", 212992) %>
        queue_size => <%= setting("var.input.udp.queue_size", 2000) %>
    }
}

filter {
    #--------------------
    # The field names for Netflow v5 and v9 differ. We need to normalize to a
    # common data model so that we can work with the resulting events in a common manner.
    #--------------------

    # Process Netflow v5 events.
    if [netflow][version] == 5 {
        mutate {
            id => "netflow-v5-normalize"
            add_field => {
                "[netflow][direction]" => "ingress"
                "[netflow][ip_version]" => "IPv4"
            }
            rename => {
                "[netflow][ipv4_src_addr]" => "[netflow][src_addr]"
                "[netflow][src_mask]" => "[netflow][src_mask_len]"
                "[netflow][l4_src_port]" => "[netflow][src_port]"
                "[netflow][ipv4_dst_addr]" => "[netflow][dst_addr]"
                "[netflow][dst_mask]" => "[netflow][dst_mask_len]"
                "[netflow][l4_dst_port]" => "[netflow][dst_port]"
                "[netflow][in_bytes]" => "[netflow][bytes]"
                "[netflow][in_pkts]" => "[netflow][packets]"
                "[netflow][ipv4_next_hop]" => "[netflow][next_hop]"
                "[netflow][src_tos]" => "[netflow][tos]"
            }
            replace => {
                "[netflow][version]" => "Netflow v5"
            }
        }
    }
    # Process Netflow v9 events.
    else if [netflow][version] == 9 {
        mutate {
            id => "netflow-v9-normalize-version"
            replace => { "[netflow][version]" => "Netflow v9" }
        }

        if [netflow][ip_protocol_version] {
            mutate {
                id => "netflow-v9-normalize-ip_version"
                add_field => { "[netflow][ip_version]" => "IPv%{[netflow][ip_protocol_version]}" }
            }
        }
        
        # Populate fields with IPv4 or IPv6 specific fields.
        if [netflow][ipv4_src_addr] or [netflow][ipv4_dst_addr] or [netflow][ip_protocol_version] == 4 {
            if [netflow][ipv4_src_addr] {
                mutate {
                    id => "netflow-v9-normalize-src_addr-from-ipv4_src_addr"
                    rename => { "[netflow][ipv4_src_addr]" => "[netflow][src_addr]" }
                }
            }
            if [netflow][src_mask] {
                mutate {
                    id => "netflow-v9-normalize-src_mask_len-from-src_mask"
                    rename => { "[netflow][src_mask]" => "[netflow][src_mask_len]" }
                }
            }
            if [netflow][ipv4_dst_addr] {
                mutate {
                    id => "netflow-v9-normalize-dst_addr-from-ipv4_dst_addr"
                    rename => { "[netflow][ipv4_dst_addr]" => "[netflow][dst_addr]" }
                }
            }
            if [netflow][dst_mask] {
                mutate {
                    id => "netflow-v9-normalize-dst_mask_len-from-dst_mask"
                    rename => { "[netflow][dst_mask]" => "[netflow][dst_mask_len]" }
                }
            }
            if [netflow][ipv4_next_hop] {
                mutate {
                    id => "netflow-v9-normalize-next_hop-from-ipv4_next_hop"
                    rename => { "[netflow][ipv4_next_hop]" => "[netflow][next_hop]" }
                }
            }
        } else if [netflow][ipv6_src_addr] or [netflow][ipv6_dst_addr] or [netflow][ip_protocol_version] == 6 {
            if [netflow][ipv6_src_addr] {
                mutate {
                    id => "netflow-v9-normalize-src_addr-from-ipv6_src_addr"
                    rename => { "[netflow][ipv6_src_addr]" => "[netflow][src_addr]" }
                }
            }
            if [netflow][ipv6_src_mask] {
                mutate {
                    id => "netflow-v9-normalize-src_mask_len-from-ipv6_src_mask"
                    rename => { "[netflow][ipv6_src_mask]" => "[netflow][src_mask_len]" }
                }
            }
            if [netflow][ipv6_dst_addr] {
                mutate {
                    id => "netflow-v9-normalize-dst_addr-from-ipv6_dst_addr"
                    rename => { "[netflow][ipv6_dst_addr]" => "[netflow][dst_addr]" }
                }
            }
            if [netflow][ipv6_dst_mask] {
                mutate {
                    id => "netflow-v9-normalize-dst_mask_len-from-ipv6_dst_mask"
                    rename => { "[netflow][ipv6_dst_mask]" => "[netflow][dst_mask_len]" }
                }
            }
            if [netflow][ipv6_next_hop] {
                mutate {
                    id => "netflow-v9-normalize-next_hop-from-ipv6_next_hop"
                    rename => { "[netflow][ipv6_next_hop]" => "[netflow][next_hop]" }
                }
            }
        } else {
            # Did not recognize IP version.
            mutate {
                id => "netflow-ip-version-not-recognized"
                add_tag => [ "__netflow_ip_version_not_recognized" ]
            }
        }
        
        # Populate flow direction (ingress/egress).
        if [netflow][direction] == 0 {
            mutate {
                id => "netflow-v9-normalize-direction-ingress"
                replace => { "[netflow][direction]" => "ingress" }
            }
        } else if [netflow][direction] == 1 {
            mutate {
                id => "netflow-v9-normalize-direction-egress"
                replace => { "[netflow][direction]" => "egress" }
            }
        } else {
            mutate {
                id => "netflow-v9-normalize-direction-not-recognized"
                add_tag => [ "__netflow_direction_not_recognized" ]
            }
        }

        # Populate source port.
        if [netflow][l4_src_port] {
            mutate {
                id => "netflow-v9-normalize-src_port_from_l4_src_port"
                rename => { "[netflow][l4_src_port]" => "[netflow][src_port]" }
            }
        } else if [netflow][tcp_src_port] {
            mutate {
                id => "netflow-v9-normalize-src_port_from_tcp_src_port"
                rename => { "[netflow][tcp_src_port]" => "[netflow][src_port]" }
            }
        } else if [netflow][udp_src_port] {
            mutate {
                id => "netflow-v9-normalize-src_port_from_udp_src_port"
                rename => { "[netflow][udp_src_port]" => "[netflow][src_port]" }
            }
        }

        # Populate destination port.
        if [netflow][l4_dst_port] {
            mutate {
                id => "netflow-v9-normalize-dst_port_from_l4_dst_port"
                rename => { "[netflow][l4_dst_port]" => "[netflow][dst_port]" }
            }
        } else if [netflow][tcp_dst_port] {
            mutate {
                id => "netflow-v9-normalize-dst_port_from_tcp_dst_port"
                rename => { "[netflow][tcp_dst_port]" => "[netflow][dst_port]" }
            }
        } else if [netflow][udp_src_port] {
            mutate {
                id => "netflow-v9-normalize-dst_port_from_udp_src_port"
                rename => { "[netflow][udp_dst_port]" => "[netflow][dst_port]" }
            }
        }

        # Populate bytes transferred in the flow.
        if [netflow][in_bytes] {
            mutate {
                id => "netflow-v9-normalize-bytes-from-in_bytes"
                rename => { "[netflow][in_bytes]" => "[netflow][bytes]" }
            }
        } else if [netflow][out_bytes] {
            mutate {
                id => "netflow-v9-normalize-bytes-from-out_bytes"
                rename => { "[netflow][out_bytes]" => "[netflow][bytes]" }
            }
        } else if [netflow][in_permanent_bytes] {
            mutate {
                id => "netflow-v9-normalize-bytes-from-in_permanent_bytes"
                rename => { "[netflow][in_permanent_bytes]" => "[netflow][bytes]" }
            }
        }
        if [netflow][bytes] {
            mutate {
                id => "netflow-v9-normalize-convert-bytes"
                convert => { "[netflow][bytes]" => "integer" }
            }
        }
        
        # Populate packets transferred in the flow.
        if [netflow][in_pkts] {
            mutate {
                id => "netflow-v9-normalize-packets-from-in_pkts"
                rename => { "[netflow][in_pkts]" => "[netflow][packets]" }
            }
        } else if [netflow][out_pkts] {
            mutate {
                id => "netflow-v9-normalize-packets-from-out_pkts"
                rename => { "[netflow][out_pkts]" => "[netflow][packets]" }
            }
        } else if [netflow][in_permanent_pkts] {
            mutate {
                id => "netflow-v9-normalize-packets-from-in_permanent_pkts"
                rename => { "[netflow][in_permanent_pkts]" => "[netflow][packets]" }
            }
        }
        if [netflow][packets] {
            mutate {
                id => "netflow-v9-normalize-convert-packets"
                convert => { "[netflow][packets]" => "integer" }
            }
        }

        # Populate source and destination MAC addresses if available.
        if [netflow][in_src_mac] {
            mutate {
                id => "netflow-v9-normalize-src_mac-from-in_src_mac"
                rename => { "[netflow][in_src_mac]" => "[netflow][src_mac]" }
            }
        } else if [netflow][out_src_mac] {
            mutate {
                id => "netflow-v9-normalize-src_mac-from-out_src_mac"
                rename => { "[netflow][out_src_mac]" => "[netflow][src_mac]" }
            }
        }
        if [netflow][in_dst_mac] {
            mutate {
                id => "netflow-v9-normalize-dst_mac-from-in_dst_mac"
                rename => { "[netflow][in_dst_mac]" => "[netflow][dst_mac]" }
            }
        } else if [netflow][out_dst_mac] {
            mutate {
                id => "netflow-v9-normalize-dst_mac-from-out_dst_mac"
                rename => { "[netflow][out_dst_mac]" => "[netflow][dst_mac]" }
            }
        }

        # Populate VLAN if available.
        if [netflow][src_vlan] {
            mutate {
                id => "netflow-v9-normalize-vlan-from-src_vlan"
                rename => { "[netflow][src_vlan]" => "[netflow][vlan]" }
            }
        } else if [netflow][dst_vlan] {
            mutate {
                id => "netflow-v9-normalize-vlan-from-dst_vlan"
                rename => { "[netflow][dst_vlan]" => "[netflow][vlan]" }
            }
        }

        # Populate TOS if available.
        if [netflow][src_tos] {
            mutate {
                id => "netflow-v9-normalize-tos-from-src_tos"
                rename => { "[netflow][src_tos]" => "[netflow][tos]" }
            }
        }
    }

    #--------------------
    # We now have a normalized flow record. The rest of the logic works
    # regardless of the Netflow version.
    #--------------------

    # Replace protocol ID with well-known name.
    if [netflow][protocol] {
        translate {
            id => "netflow-postproc-translate-protocol_name"
            dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/iana_protocol_numbers.yml") %>"
            field => "[netflow][protocol]"
            destination => "[netflow][protocol_name]"
            fallback => "UNKNOWN"
        }
        
        # lookup IANA service name for source and destination ports.
        if [netflow][protocol] == 6 { # TCP
            if [netflow][src_port] {
                translate {
                    id => "netflow-postproc-translate-src_port_name_tcp"
                    dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/iana_service_names_tcp.yml") %>"
                    field => "[netflow][src_port]"
                    destination => "[netflow][src_port_name]"
                    fallback => "__UNKNOWN"
                }
            }
            if [netflow][dst_port] {
                translate {
                    id => "netflow-postproc-translate-dst_port_name_tcp"
                    dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/iana_service_names_tcp.yml") %>"
                    field => "[netflow][dst_port]"
                    destination => "[netflow][dst_port_name]"
                    fallback => "__UNKNOWN"
                }
            }
        } else if [netflow][protocol] == 17 { # UDP
            if [netflow][src_port] {
                translate {
                    id => "netflow-postproc-translate-src_port_name_udp"
                    dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/iana_service_names_udp.yml") %>"
                    field => "[netflow][src_port]"
                    destination => "[netflow][src_port_name]"
                    fallback => "__UNKNOWN"
                }
            }
            if [netflow][dst_port] {
                translate {
                    id => "netflow-postproc-translate-dst_port_name_udp"
                    dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/iana_service_names_udp.yml") %>"
                    field => "[netflow][dst_port]"
                    destination => "[netflow][dst_port_name]"
                    fallback => "__UNKNOWN"
                }
            }
        } else if [netflow][protocol] == 132 { # SCTP
            if [netflow][src_port] {
                translate {
                    id => "netflow-postproc-translate-src_port_name_sctp"
                    dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/iana_service_names_sctp.yml") %>"
                    field => "[netflow][src_port]"
                    destination => "[netflow][src_port_name]"
                    fallback => "__UNKNOWN"
                }
            }
            if [netflow][dst_port] {
                translate {
                    id => "netflow-postproc-translate-dst_port_name_sctp"
                    dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/iana_service_names_sctp.yml") %>"
                    field => "[netflow][dst_port]"
                    destination => "[netflow][dst_port_name]"
                    fallback => "__UNKNOWN"
                }
            }
        } else if [netflow][protocol] == 33 { # DCCP
            if [netflow][src_port] {
                translate {
                    id => "netflow-postproc-translate-src_port_name_dccp"
                    dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/iana_service_names_dccp.yml") %>"
                    field => "[netflow][src_port]"
                    destination => "[netflow][src_port_name]"
                    fallback => "__UNKNOWN"
                }
            }
            if [netflow][dst_port] {
                translate {
                    id => "netflow-postproc-translate-dst_port_name_dccp"
                    dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/iana_service_names_dccp.yml") %>"
                    field => "[netflow][dst_port]"
                    destination => "[netflow][dst_port_name]"
                    fallback => "__UNKNOWN"
                }
            }
        }

        if [netflow][src_port_name] {
            if [netflow][src_port_name] == "__UNKNOWN" {
                mutate {
                    id => "netflow-postproc-src_port_name_unknown"
                    replace => { "[netflow][src_port_name]" => "%{[netflow][protocol_name]}/%{[netflow][src_port]}" }
                }
            } else {
                mutate {
                    id => "netflow-postproc-src_port_name-prepend-src_port"
                    replace => { "[netflow][src_port_name]" => "%{[netflow][protocol_name]}/%{[netflow][src_port]} (%{[netflow][src_port_name]})" }
                }
            }
        }
        if [netflow][dst_port_name] {
            if [netflow][dst_port_name] == "__UNKNOWN" {
                mutate {
                    id => "netflow-postproc-dst_port_name_unknown"
                    replace => { "[netflow][dst_port_name]" => "%{[netflow][protocol_name]}/%{[netflow][dst_port]}" }
                }
            } else {
                mutate {
                    id => "netflow-postproc-dest_port_name-prepend-dst_port"
                    replace => { "[netflow][dst_port_name]" => "%{[netflow][protocol_name]}/%{[netflow][dst_port]} (%{[netflow][dst_port_name]})" }
                }
            }
        }
    }
    
    # Source and Destination IP address processing.
    if [netflow][dst_addr] or [netflow][src_addr] {
        # Initialize flow_locality to private. This maybe changed to public based on analysis of the source and destination IP addresses below.
        mutate {
            id => "netflow-postproc-flow_locality-default"
            add_field => { "[netflow][flow_locality]" => "private" }
        }

        if [netflow][dst_addr] {
            # Check if destination IP address is private.
            cidr {
                id => "netflow-postproc-cidr-dst_addr"
                address => [ "%{[netflow][dst_addr]}" ]
                network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128","169.254.0.0/16", "fe80::/10","224.0.0.0/4", "ff00::/8","255.255.255.255/32" ]
                add_field => { "[netflow][dst_locality]" => "private" }
            }
            # Check to see if dst_locality exists. If it doesn't the dst_addr didn't match a private address space and locality must be public.
            if ![netflow][dst_locality] {
                mutate {
                    id => "netflow-postproc-dst_addr-public"
                    add_field => { "[netflow][dst_locality]" => "public" }
                    replace => { "[netflow][flow_locality]" => "public" }
                }
                geoip {
                    id => "netflow-postproc-geoip_dst-city"
                    source => "[netflow][dst_addr]"
                    default_database_type => "City"
                    target => "[geoip_dst]"
                }
                geoip {
                    id => "netflow-postproc-geoip_dst-asn"
                    source => "[netflow][dst_addr]"
                    default_database_type => "ASN"
                    target => "[geoip_dst]"
                }
                # Populate geoip_dst.autonomous_system.
                if [geoip_dst][as_org] {
                    if [geoip_dst][asn] {
                        mutate {
                            id => "netflow-postproc-dst-as-from-as_org-asn"
                            add_field => { "[geoip_dst][autonomous_system]" => "%{[geoip_dst][as_org]} (%{[geoip_dst][asn]})" }
                        }
                    } else {
                        mutate {
                            id => "netflow-postproc-dst-as-from-as_org"
                            add_field => { "[geoip_dst][autonomous_system]" => "%{[geoip_dst][as_org]}" }
                        }
                    }
                } else if [geoip_dst][asn] {
                    mutate {
                        id => "netflow-postproc-dst-as-from-asn"
                        add_field => { "[geoip_dst][autonomous_system]" => "%{[geoip_dst][asn]}" }
                    }
                }
            } else {
                mutate {
                    id => "netflow-postproc-dst-as-private"
                    add_field => { "[geoip_dst][autonomous_system]" => "PRIVATE" }
                }
            }
        }

        if [netflow][src_addr] {
            # Check if source IP address is private.
            cidr {
                id => "netflow-postproc-cidr-src_addr"
                address => [ "%{[netflow][src_addr]}" ]
                network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128", "169.254.0.0/16", "fe80::/10", "224.0.0.0/4", "ff00::/8", "255.255.255.255/32", "::" ]
                add_field => { "[netflow][src_locality]" => "private" }
            }
            # Check to see if src_locality exists. If it doesn't the src_addr didn't match a private address space and locality must be public.
            if ![netflow][src_locality] {
                mutate {
                    id => "netflow-postproc-src_addr-public"
                    add_field => { "[netflow][src_locality]" => "public" }
                    replace => { "[netflow][flow_locality]" => "public" }
                }
                geoip {
                    id => "netflow-postproc-geoip_src-city"
                    source => "[netflow][src_addr]"
                    default_database_type => "City"
                    target => "[geoip_src]"
                }
                geoip {
                    id => "netflow-postproc-geoip_src-asn"
                    source => "[netflow][src_addr]"
                    default_database_type => "ASN"
                    target => "[geoip_src]"
                }
                # Populate geoip_src.autonomous_system.
                if [geoip_src][as_org] {
                    if [geoip_src][asn] {
                        mutate {
                            id => "netflow-postproc-src-as-from-as_org-asn"
                            add_field => { "[geoip_src][autonomous_system]" => "%{[geoip_src][as_org]} (%{[geoip_src][asn]})" }
                        }
                    } else {
                        mutate {
                            id => "netflow-postproc-src-as-from-as_org"
                            add_field => { "[geoip_src][autonomous_system]" => "%{[geoip_src][as_org]}" }
                        }
                    }
                } else if [geoip_src][asn] {
                    mutate {
                        id => "netflow-postproc-src-as-from-asn"
                        add_field => { "[geoip_src][autonomous_system]" => "%{[geoip_src][asn]}" }
                    }
                }
            } else {
                mutate {
                    id => "netflow-postproc-src-as-private"
                    add_field => { "[geoip_src][autonomous_system]" => "PRIVATE" }
                }
            }
        }

        # Lookup geolocation of external traffic. Source IP has priority over destination IP.
        if [netflow][src_locality] == "public" {
            geoip {
                id => "netflow-postproc-geoip-city-src"
                source => "[netflow][src_addr]"
                default_database_type => "City"
            }
            geoip {
                id => "netflow-postproc-geoip-asn-src"
                source => "[netflow][src_addr]"
                default_database_type => "ASN"
            }
        } else if [netflow][dst_locality] == "public" {
            geoip {
                id => "netflow-postproc-geoip-city-dst"
                source => "[netflow][dst_addr]"
                default_database_type => "City"
            }
            geoip {
                id => "netflow-postproc-geoip-asn-dst"
                source => "[netflow][dst_addr]"
                default_database_type => "ASN"
            }
        }
        # Populate geoip.autonomous_system.
        if [geoip][as_org] {
            if [geoip][asn] {
                mutate {
                    id => "netflow-postproc-as-from-as_org-asn"
                    add_field => { "[geoip][autonomous_system]" => "%{[geoip][as_org]} (%{[geoip][asn]})" }
                }
            } else {
                mutate {
                    id => "netflow-postproc-as-from-as_org"
                    add_field => { "[geoip][autonomous_system]" => "%{[geoip][as_org]}" }
                }
            }
        } else if [geoip][asn] {
            mutate {
                id => "netflow-postproc-as-from-asn"
                add_field => { "[geoip][autonomous_system]" => "%{[geoip][asn]}" }
            }
        }
    }

    # Process TCP flags.
    if [netflow][tcp_flags] {
        # Add TCP flags label.
        translate {
            id => "netflow-postproc-translate-tcp_flags_label"
            dictionary_path => "<%= ::File.join(LogStash::Environment::LOGSTASH_HOME, "/modules/netflow/configuration/logstash/dictionaries/tcp_flags.yml") %>"
            field => "[netflow][tcp_flags]"
            destination => "[netflow][tcp_flags_label]"
            fallback => "%{[netflow][tcp_flags]}"
        }

        # Create array of TCP flag tags.
        ruby {
            id => "netflow-postproc-ruby-tcp_flags_tags"
            code => "
                flags =[]
                if event.get('[netflow][tcp_flags]').to_i & 1 > 0
                    flags.push('FIN')
                end
                if event.get('[netflow][tcp_flags]').to_i & 2 > 0
                    flags.push('SYN')
                end
                if event.get('[netflow][tcp_flags]').to_i & 4 > 0
                    flags.push('RST')
                end
                if event.get('[netflow][tcp_flags]').to_i & 8 > 0
                    flags.push('PSH')
                end
                if event.get('[netflow][tcp_flags]').to_i & 16 > 0
                    flags.push('ACK')
                end
                if event.get('[netflow][tcp_flags]').to_i & 32 > 0
                    flags.push('URG')
                end
                if event.get('[netflow][tcp_flags]').to_i & 64 > 0
                    flags.push('ECE')
                end
                if event.get('[netflow][tcp_flags]').to_i & 128 > 0
                    flags.push('CWR')
                end
                event.set('[netflow][tcp_flag_tags]', flags)
            "
        }
    }
}

output {
<%= elasticsearch_output_config() %>
}
